Oozie说明书--Hadoop工作流系统

要点:

  1. 一个工作流应用是DAG,协调如下类型的活动:Hadoop, Pig, 和 sub-workflows.
  2. 可以使用decision, fork and join 节点来完成工作流中的流动控制操作,不支持环形。
  3. 活动和决定可以在作业属性中参数化,活动输出(i.e. Hadoop计数器)和文件信息(文件存在,文件大小等).正规参数在工作流定义中以 ${VAR} 变量表示.
  4. 工作流应用是一个zip文件包含工作流定义(XML文件),所有必须文件来运行所有活动:jar文件用于MR作业,shell用于流MR作业,本地库,pig脚本和其它资源文件.
  5. 运行工作流作业前,相应工作流应用必须在Oozie中部署。
  6. 部署工作流应用和运行工作流作业可用通过命令行工具,WS Api和Java Api来完成.
  7. 监控系统和工作流作业可用通过Web控制台,命令行工具,WS Api和Java Api来完成。
  8. 当提交一个WF作业,一系列属性分解在工作流定义所有正规参数,必须提供。这个属性的设置是一个Hadoop配置。
  9. WF作业可能状态是有PREP , RUNNING , SUSPENDED , SUCCEEDED , KILLED 和 FAILED .
  10. WF作业的活动启动失败,基于失败类型,Oozie自动尝试恢复,请求手动重试或WF作业失败。
  11. 可以在节点start/end/failure事件和WF的end/failure事件中创建HTTP回调通知。
  12. 有些WF的失败,WF作业可以重新提交跳过先前完成的活动,在重新提交WF应用前可以更新could be updated with a patch to fix a problem in the workflow application code.

工作流定义

WF定义是一个控制流动节点(start, end, decision, fork, join, kill)或者活动节点(map-reduce, pig, etc.)的DAG, 节点通过转换箭头连接.WF定义语言为基于XML,称之为hPDL (Hadoop Process Definition Language).

Fork and Join 例子:3步执行4个MR作业,1个作业,2个并行作业和1个作业,上一个作业的输出作为下一个作业的输入.

WF作业的必须参数:

jobtracker : JobTracker HOST:PORT
namenode : NameNode HOST:PORT
input : input directory
output : output directory

<workflow-app name='example-forkjoinwf' xmlns="uri:oozie:workflow:0.1">
    <start to='firstjob' />
    <action name="firstjob">
        <map-reduce>
            <job-tracker>${jobtracker}</job-tracker>
            <name-node>${namenode}</name-node>
            <configuration>
                <property>
                    <name>mapred.mapper.class</name>
                    <value>org.apache.hadoop.example.IdMapper</value>
                </property>
                <property>
                    <name>mapred.reducer.class</name>
                    <value>org.apache.hadoop.example.IdReducer</value>
                </property>
                <property>
                    <name>mapred.map.tasks</name>
                    <value>1</value>
                </property>
                <property>
                    <name>mapred.input.dir</name>
                    <value>${input}</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>/usr/foo/${wf:id()}/temp1</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="fork" />
        <error to="kill" />
     </action>
    <fork name='fork'>
        <path start='secondjob' />
        <path start='thirdjob' />
    </fork>
    <action name="secondjob">
        <map-reduce>
            <job-tracker>${jobtracker}</job-tracker>
            <name-node>${namenode}</name-node>
            <configuration>
                <property>
                    <name>mapred.mapper.class</name>
                    <value>org.apache.hadoop.example.IdMapper</value>
                </property>
                <property>
                    <name>mapred.reducer.class</name>
                    <value>org.apache.hadoop.example.IdReducer</value>
                </property>
                <property>
                    <name>mapred.map.tasks</name>
                    <value>1</value>
                </property>
                <property>
                    <name>mapred.input.dir</name>
                    <value>/usr/foo/${wf:id()}/temp1</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>/usr/foo/${wf:id()}/temp2</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="join" />
        <error to="kill" />
    </action>
    <action name="thirdjob">
        <map-reduce>
            <job-tracker>${jobtracker}</job-tracker>
            <name-node>${namenode}</name-node>
            <configuration>
                <property>
                    <name>mapred.mapper.class</name>
                    <value>org.apache.hadoop.example.IdMapper</value>
                </property>
                <property>
                    <name>mapred.reducer.class</name>
                    <value>org.apache.hadoop.example.IdReducer</value>
                </property>
                <property>
                    <name>mapred.map.tasks</name>
                    <value>1</value>
                </property>
                <property>
                    <name>mapred.input.dir</name>
                    <value>/usr/foo/${wf:id()}/temp1</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>/usr/foo/${wf:id()}/temp3</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="join" />
        <error to="kill" />
    </action>
    <join name='join' to='finalejob'/>
    <action name="finaljob">
        <map-reduce>
        <job-tracker>${jobtracker}</job-tracker>
        <name-node>${namenode}</name-node>
            <configuration>
                <property>
                    <name>mapred.mapper.class</name>
                    <value>org.apache.hadoop.example.IdMapper</value>
                </property>
                <property>
                    <name>mapred.reducer.class</name>
                    <value>org.apache.hadoop.example.IdReducer</value>
                </property>
                <property>
                    <name>mapred.map.tasks</name>
                    <value>1</value>
                </property>
                <property>
                    <name>mapred.input.dir</name>
                    <value>/usr/foo/${wf:id()}/temp2,/usr/foo/${wf:id()}/temp3
                    </value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>${output}</value>
                </property>
            </configuration>
        </map-reduce>
    <ok to="end" />
    <ok to="kill" />
    </action>
    <kill name="kill">
        <message>Map/Reduce failed, error message[${wf:errorMessage()}]</message>
    </kill>
    <end name='end'/>
</workflow-app>

工作流节点

节点分为:控制节点活动节点

节点名称和转换必须遵从形式 [a-zA-Z][\-_a-zA-Z0-0]*,最多20字符

1. 控制节点

开始和结束(start,end,kill)和WF执行路径控制(decision,fork,join)节点。

Start

示例:

<workflow-app name="foo-wf" xmlns="uri:oozie:workflow:0.1">
    ...
    <start to="firstHadoopJob"/>
    ...
</workflow-app>

to属性为第一个执行节点名称

END

<workflow-app name="foo-wf" xmlns="uri:oozie:workflow:0.1">
    ...
    <end name="end"/>
</workflow-app>

name为转换到WF作业到结束。此时作业状态为SUCCEEDED

Kill

<workflow-app name="foo-wf" xmlns="uri:oozie:workflow:0.1">
    ...
    <kill name="killBecauseNoInput">
        <message>Input unavailable</message>
    </kill>
    ...
</workflow-app>

kill没有转换元素。此时作业状态为KILLED。message日志消息。

Decision

decision节点的活动类似于switch-case语法,可以使用jsp EL表达式来产生boolean值。

<workflow-app name="foo-wf" xmlns="uri:oozie:workflow:0.1">
    ...
    <decision name="mydecision">
        <switch>
            <case to="reconsolidatejob">
              ${fs:fileSize(secondjobOutputDir) gt 10 * GB}
            </case>
            <case to="rexpandjob">
              ${fs:filSize(secondjobOutputDir) lt 100 * MB}
            </case>
            <case to="recomputejob">
              ${ hadoop:counters('secondjob')[RECORDS][REDUCE_OUT] lt 1000000 }
            </case>
            <default to="end"/>
        </switch>
    </decision>
    ...
</workflow-app>

Fork Join

fork和join成对使用,

<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
    ...
    <fork name="forking">
        <path start="firstparalleljob"/>
        <path start="secondparalleljob"/>
    </fork>
    <action name="firstparallejob">
        <map-reduce>
            <job-tracker>foo:8021</job-tracker>
            <name-node>bar:8020</name-node>
            <job-xml>job1.xml</job-xml>
        </map-reduce>
        <ok to="joining"/>
        <error to="kill"/>
    </action>
    <action name="secondparalleljob">
        <map-reduce>
            <job-tracker>foo:8021</job-tracker>
            <name-node>bar:8020</name-node>
            <job-xml>job2.xml</job-xml>
        </map-reduce>
        <ok to="joining"/>
        <error to="kill"/>
    </action>
    <join name="joining" to="nextaction"/>
    ...
</workflow-app>

WF中任何分支的验证来保证有效和稳定,阻止WF被提交,并确定指定WF能正常工作,可以禁用分支连接验证,在job.properties文件中指定 oozie.wf.validate.ForkJoinfalse,禁用所有WF验证,在oozie-site.xml文件中设置=oozie.validate.ForkJoin=false。 Disabling this validation is determined by the AND of both of these properties, so it will be disabled if either or both are set to false and only enabled if both are set to true (or not specified).

2. 活动节点

被活动节点触发计算/处理的任务对于Oozie都是远程的,非WF应用指定计算或处理的任务在Oozie内部完成。

所有计算/处理的任务被一个action节点触发都是被Oozie异步执行,除了 fs活动。Oozie检测任务完成通过回调和轮询两种方式。

当一个计算任务由Oozie启动,Oozie为任务提供一个唯一回调URL,任务完成时调用指定URL。如果没有回调URL或在完成时不能回调URL,Oozie将轮询任务是否完成。

活动有两种过渡:okerror。 任务以error退出,任务必须提供error-codeerror-message信息给Oozie。所有节点类型必须清楚定义所有可用产生的error code。

活动恢复,活动成功启动将不再重试,失败时重新执行,这种假设是外部系统执行这个节点有足够弹性恢复作业,当它启动后。根据失败类型,有不同的恢复机制:1. 短暂失败,Oozie在预定义时间周期后重试。2. 非短暂性失败,Oozie将延迟WF作业直到手动或编程恢复作业。3. 如果是error的失败并且重试没有解决问题,Oozie将为action处理error转换。

1. Map-Reduce Action

Hadoop作业的计数器和退出状态(FAILED, KILLED or SUCCEEDED ) 必须在作业结束后对工作流作业有效,这些信息将被用于分支节点或其它节点的配置.

配置属性加载顺序为streaming-> job-xml-> configuration,后边的属性可以将覆盖前面的属性。 Hadoop mapred.job.trackerfs.default.name一定不能再job-xml文件和inner配置中出现;

添加filearchive元素添加文件或文档;

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.1">
    ...
    <action name="[NODE-NAME]">
        <map-reduce>
            <job-tracker>[JOB-TRACKER]</job-tracker>
            <name-node>[NAME-NODE]</name-node>
            <prepare>
                <delete path="[PATH]"/>
                ...
                <mkdir path="[PATH]"/>
                ...
            </prepare>
            <streaming>
                <mapper>[MAPPER-PROCESS]</mapper>
                <reducer>[REDUCER-PROCESS]</reducer>
                <record-reader>[RECORD-READER-CLASS]</record-reader>
                <record-reader-mapping>[NAME=VALUE]</record-reader-mapping>
                ...
                <env>[NAME=VALUE]</env>
                ...
            </streaming>
            <!-- Either streaming or pipes can be specified for an action, not both -->
            <pipes>
                <map>[MAPPER]</map>
                <reduce>[REDUCER]</reducer>
                <inputformat>[INPUTFORMAT]</inputformat>
                <partitioner>[PARTITIONER]</partitioner>
                <writer>[OUTPUTFORMAT]</writer>
                <program>[EXECUTABLE]</program>
            </pipes>
            <job-xml>[JOB-XML-FILE]</job-xml>
            <configuration>
                <property>
                    <name>[PROPERTY-NAME]</name>
                    <value>[PROPERTY-VALUE]</value>
                </property>
                ...
            </configuration>
            <file>[FILE-PATH]</file>
            ...
            <archive>[FILE-PATH]</archive>
            ...
        </map-reduce>        <ok to="[NODE-NAME]"/>
        <error to="[NODE-NAME]"/>
    </action>
    ...
</workflow-app>

启动外部状态通过指定oozie.action.external.stats.writetrue or false在 workflow.xml文件中配置,默认false;在 mapper 和reducer 处理流作业时,执行命令URL编码 % 替换成%25;

<workflow-app name="foo-wf" xmlns="uri:oozie:workflow:0.1">
    ...
    <action name="myfirstHadoopJob">
        <map-reduce>
            <job-tracker>foo:8021</job-tracker>
            <name-node>bar:8020</name-node>
            <prepare>
                <delete path="hdfs://foo:8020/usr/tucu/output-data"/>
            </prepare>
            <job-xml>/myfirstjob.xml</job-xml>
            <configuration>
                <property>
                    <name>mapred.input.dir</name>
                    <value>/usr/tucu/input-data</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>/usr/tucu/input-data</value>
                </property>
                <property>
                    <name>mapred.reduce.tasks</name>
                    <value>${firstJobReducers}</value>
                </property>
                <property>
                    <name>oozie.action.external.stats.write</name>
                    <value>true</value>
                </property>
            </configuration>
        </map-reduce>
        <ok to="myNextAction"/>
        <error to="errorCleanup"/>
    </action>
    ...
</workflow-app>

Streaming Example:

<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
    ...
    <action name="firstjob">
        <map-reduce>
            <job-tracker>foo:8021</job-tracker>
            <name-node>bar:8020</name-node>
            <prepare>
                <delete path="${output}"/>
            </prepare>
            <streaming>
                <mapper>/bin/bash testarchive/bin/mapper.sh testfile</mapper>
                <reducer>/bin/bash testarchive/bin/reducer.sh</reducer>
            </streaming>
            <configuration>
                <property>
                    <name>mapred.input.dir</name>
                    <value>${input}</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>${output}</value>
                </property>
                <property>
                    <name>stream.num.map.output.key.fields</name>
                    <value>3</value>
                </property>
            </configuration>
            <file>/users/blabla/testfile.sh#testfile</file>
            <archive>/users/blabla/testarchive.jar#testarchive</archive>
        </map-reduce>
        <ok to="end"/>
        <error to="kill"/>
    </action>
  ...
</workflow-app>

Pipes Example:

<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.1">
    ...
    <action name="firstjob">
        <map-reduce>
            <job-tracker>foo:8021</job-tracker>
            <name-node>bar:8020</name-node>
            <prepare>
                <delete path="${output}"/>
            </prepare>
            <pipes>
                <program>bin/wordcount-simple#wordcount-simple</program>
            </pipes>
            <configuration>
                <property>
                    <name>mapred.input.dir</name>
                    <value>${input}</value>
                </property>
                <property>
                    <name>mapred.output.dir</name>
                    <value>${output}</value>
                </property>
            </configuration>
            <archive>/users/blabla/testarchive.jar#testarchive</archive>
        </map-reduce>
        <ok to="end"/>
        <error to="kill"/>
    </action>
  ...
</workflow-app>

Pig Action ###

(参见 Pig Action Extention)

FS(HDFS)Action

<workflow-app name="[WF-DEF-NAME]" xmlns="uri:oozie:workflow:0.5">
    ...
    <action name="[NODE-NAME]">
        <fs>
            <delete path='[PATH]'/>
            ...
            <mkdir path='[PATH]'/>
            ...
            <move source='[SOURCE-PATH]' target='[TARGET-PATH]'/>
            ...
            <chmod path='[PATH]' permissions='[PERMISSIONS]' dir-files='false' />
            ...
            <touchz path='[PATH]' />
            ...
            <chgrp path='[PATH]' group='[GROUP]' dir-files='false' />
        </fs>
        <ok to="[NODE-NAME]"/>
        <error to="[NODE-NAME]"/>
    </action>
    ...
</workflow-app>


<workflow-app name="sample-wf" xmlns="uri:oozie:workflow:0.5">
    ...
    <action name="hdfscommands">
         <fs>
            <delete path='hdfs://foo:8020/usr/tucu/temp-data'/>
            <mkdir path='archives/${wf:id()}'/>
            <move source='${jobInput}' target='archives/${wf:id()}/processed-input'/>
            <chmod path='${jobOutput}' permissions='-rwxrw-rw-' dir-files='true'><recursive/></chmod>
            <chgrp path='${jobOutput}' group='testgroup' dir-files='true'><recursive/></chgrp>
        </fs>
        <ok to="myotherjob"/>
        <error to="errorcleanup"/>
    </action>
    ...
</workflow-app>

Sub-workflow Action

Java Action

在Java类中的main方法中一定不能调用System.exit(int n),这将导致错误过渡,并忽视退出代码。

安全集群中运行,需要声明Hadoop 委托token,如下代码:

// propagate delegation related props from launcher job to MR job
if (System.getenv("HADOOP_TOKEN_FILE_LOCATION") != null) {
    jobConf.set("mapreduce.job.credentials.binary", System.getenv("HADOOP_TOKEN_FILE_LOCATION"));
}

工作流参数化

参数化语言使用 EL

属性是有效Java标识符[A-Za-z_][0-9A-Za-z_]* ,可以使用'${NAME}'变量定义.

属性是非有效Java标识符, 如'job.tracker',可以通过String wf:conf(String name)方法访问.

基本 EL方法:

String firstNotNull(String value1, String value2)   
String concat(String s1, String s2) 
String appendAll(String src, String append, String delimeter)   
String trim(String s)   
String urlEncode(String s)  
String timestamp()  
String toJsonStr(Map)   
String toPropertiesStr(Map) 
String toConfigurationStr(Map)

工作流EL方法:

String wf:id()                    WF作业ID  
String wf:name()                  WF应用名称
String wf:appPath()               WF应用路径
String wf:conf(String name)       WF配置属性,未定义返回 空字符串
String wf:user()                  启动当前WF作业的用户名
String wf:group()                 返回 group/ACL
String wf:callback(String stateVar)

It returns the callback URL for the current workflow action node, stateVar can be a valid exit state (=OK= or ERROR ) for the action or a token to be replaced with the exit state by the remote system executing the task.

String wf:transition(String node)

It returns the transition taken by the specified workflow action node, or an empty string if the action has not being executed or it has not completed yet.

String wf:lastErrorNode()

It returns the name of the last workflow action node that exit with an ERROR exit state, or an empty string if no a ction has exited with ERROR state in the current workflow job.

String wf:errorCode(String node)

It returns the error code for the specified action node, or an empty string if the action node has not exited with ERROR state.

Each type of action node must define its complete error code list.

String wf:errorMessage(String message)

It returns the error message for the specified action node, or an empty string if no action node has not exited with ERROR state.

The error message can be useful for debugging and notification purposes.

int wf:run()

It returns the run number for the current workflow job, normally 0 unless the workflow job is re-run, in which case indicates the current run.

Map wf:actionData(String node)

This function is only applicable to action nodes that produce output data on completion.

The output data is in a Java Properties format and via this EL function it is available as a Map .

int wf:actionExternalId(String node)

It returns the external Id for an action node, or an empty string if the action has not being executed or it has not completed yet.

int wf:actionTrackerUri(String node)

It returns the tracker URIfor an action node, or an empty string if the action has not being executed or it has not completed yet.

int wf:actionExternalStatus(String node)

It returns the external status for an action node, or an empty string if the action has not being executed or it has not completed yet.

Hadoop EL 常量

RECORDS: Hadoop record counters group name.
MAP_IN: Hadoop mapper input records counter name.
MAP_OUT: Hadoop mapper output records counter name.
REDUCE_IN: Hadoop reducer input records counter name.
REDUCE_OUT: Hadoop reducer input record counter name.
GROUPS: 1024 * Hadoop mapper/reducer record groups counter name.

Hadoop EL 函数:

Hadoop Job EL 函数:

 wf:actionData() 
        <arg> ${wf:actionData("pig-node")["hadoopJobs"]}</arg>

HDFS EL Functions

boolean fs:exists(String path)
boolean fs:isDir(String path)
boolean fs:dirSize(String path)
boolean fs:fileSize(String path)
boolean fs:blockSize(String path)

HCatalog EL Functions

boolean hcat:exists(String uri)

Workflow Notifications

在工作流作业中 包含 oozie.wf.action.notification.url属性,Oozie将为每个作业提供一个URLURL: 其中URL包含了下面tokens,在做出notifiction的时候,将被实际值所替换: $jobId : The workflow job ID $nodeName : The name of the workflow node $status : If the action has not completed yet, it contains the action status 'S:'. If the action has ended, it contains the action transition 'T:'

WF应用程序部署

WF应用安装在HDFS目录,目录布局为:

    - /workflow.xml
    - /config-default.xml
    |
    - /lib/ (*.jar;*.so)

如果Jar文件或本地库不在 lib/ 目录,可以在 mr或 pig action中指定 file元素。同样可以添加 Uber jar ,指定 oozie.mapreduce.uber.jar配置属性来添加; oozie-site.xml中指定 oozie.action.mapreduce.uber.jar.enable

<action name="mr-node">
    <map-reduce>
        <job-tracker>${jobTracker}</job-tracker>
        <name-node>${nameNode}</name-node>
        <configuration>
            <property>
                <name>oozie.mapreduce.uber.jar</name>
                <value>${nameNode}/user/${wf:user()}/my-uber-jar.jar</value>
            </property>
        </configuration>
    </map-reduce>
    <ok to="end"/>
    <error to="fail"/>
</action>

WF作业生命周期

WF作业可用有下面的状态:PREP , RUNNING,SUSPENDED,SUCCEEDED,KILLED,FAILED, WF作业状态过渡:

--> PREP
PREP --> RUNNING | KILLED
RUNNING --> SUSPENDED | SUCCEEDED | KILLED | FAILED
SUSPENDED --> RUNNING | KILLED

WF作业恢复(re-run)

配置中必须指定 oozie.wf.rerun.skip.nodes,节点名必须用逗号分隔;必须指定oozie.wf.rerun.failnodes来重新运行失败的节点,值为 true / false

Oozie Web Services API

Command Line Tools

Web UI Console

Global Configurations

全局配置:可用在 workflow.xml 文件开头定义,可能包含 job-xml, configuration, job-tracker, name-node信息,如果之后被重新定义,则将覆盖;

<workflow-app xmlns="uri:oozie:workflow:0.4" name="wf-name">
<global>
   <job-tracker>${job-tracker}</job-tracker>
   <name-node>${namd-node}</name-node>
   <job-xml>job1.xml</job-xml>
   <configuration>
        <property>
            <name>mapred.job.queue.name</name>
            <value>${queueName}</value>
        </property>
    </configuration>
</global>

附录1: Oozie xml-Schema Version 0.5

<-- 添加的注释 -->: 添加的注释,非官方文档提供

<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:workflow="uri:oozie:workflow:0.5"
           elementFormDefault="qualified" targetNamespace="uri:oozie:workflow:0.5">    <xs:element name="workflow-app" type="workflow:WORKFLOW-APP"/>
    <-- 简单类型 -->
    <xs:simpleType name="IDENTIFIER">
        <xs:restriction base="xs:string">
            <xs:pattern value="([a-zA-Z_]([\-_a-zA-Z0-9])*){1,39}"/>
        </xs:restriction>
    </xs:simpleType>
    <-- 复杂类型 -->
    <xs:complexType name="WORKFLOW-APP">
        <xs:sequence>

            <-- 基本类型:global ,credentials,start,end -->
            <xs:element name="parameters" type="workflow:PARAMETERS" minOccurs="0" maxOccurs="1"/>
            <xs:element name="global" type="workflow:GLOBAL" minOccurs="0" maxOccurs="1"/>
            <xs:element name="credentials" type="workflow:CREDENTIALS" minOccurs="0" maxOccurs="1"/>
            <xs:element name="start" type="workflow:START" minOccurs="1" maxOccurs="1"/>
            <-- 控制节点类型:fork ,join,kill,action -->
            <xs:choice minOccurs="0" maxOccurs="unbounded">
                <xs:element name="decision" type="workflow:DECISION" minOccurs="1" maxOccurs="1"/>
                <xs:element name="fork" type="workflow:FORK" minOccurs="1" maxOccurs="1"/>
                <xs:element name="join" type="workflow:JOIN" minOccurs="1" maxOccurs="1"/>
                <xs:element name="kill" type="workflow:KILL" minOccurs="1" maxOccurs="1"/>
                <xs:element name="action" type="workflow:ACTION" minOccurs="1" maxOccurs="1"/>
            </xs:choice>
            <xs:element name="end" type="workflow:END" minOccurs="1" maxOccurs="1"/>
            <xs:any namespace="uri:oozie:sla:0.1 uri:oozie:sla:0.2" minOccurs="0" maxOccurs="1"/>
        </xs:sequence>
        <xs:attribute name="name" type="xs:string" use="required"/>
    </xs:complexType>

    <-- 复杂类型-参数定义:属性 name,value ,description-->
    <xs:complexType name="PARAMETERS">
        <xs:sequence>
            <xs:element name="property" minOccurs="1" maxOccurs="unbounded">
                <xs:complexType>
                    <xs:sequence>
                        <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/>
                        <xs:element name="value" minOccurs="0" maxOccurs="1" type="xs:string"/>
                        <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/>
                    </xs:sequence>
                </xs:complexType>
            </xs:element>
        </xs:sequence>
    </xs:complexType>
    <-- 复杂类型-全局定义:job-tracker;name-node ;job-xml ;configuration -->
    <xs:complexType name="GLOBAL">
        <xs:sequence>
            <xs:element name="job-tracker" type="xs:string" minOccurs="0" maxOccurs="1"/>
            <xs:element name="name-node" type="xs:string" minOccurs="0" maxOccurs="1"/>
            <xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
            <xs:element name="configuration" type="workflow:CONFIGURATION" minOccurs="0" maxOccurs="1"/>
        </xs:sequence>
    </xs:complexType>

    <-- 复杂类型:START:to;END:name;DECISION:name,switch(sequence)  -->      
    <xs:complexType name="START">
        <xs:attribute name="to" type="workflow:IDENTIFIER" use="required"/>
    </xs:complexType>
    <xs:complexType name="END">
        <xs:attribute name="name" type="workflow:IDENTIFIER" use="required"/>
    </xs:complexType>
    <xs:complexType name="DECISION">
        <xs:sequence>
            <xs:element name="switch" type="workflow:SWITCH" minOccurs="1" maxOccurs="1"/>
        </xs:sequence>
        <xs:attribute name="name" type="workflow:IDENTIFIER" use="required"/>
    </xs:complexType>
    <xs:element name="switch" type="workflow:SWITCH"/>
    <xs:complexType name="SWITCH">
        <xs:sequence>
            <xs:sequence>
                <xs:element name="case" type="workflow:CASE" minOccurs="1" maxOccurs="unbounded"/>
                <xs:element name="default" type="workflow:DEFAULT" minOccurs="1" maxOccurs="1"/>
            </xs:sequence>
        </xs:sequence>
    </xs:complexType>
    <xs:complexType name="CASE">
        <xs:simpleContent>
            <xs:extension base="xs:string">
                <xs:attribute name="to" type="workflow:IDENTIFIER" use="required"/>
            </xs:extension>
        </xs:simpleContent>
    </xs:complexType>
    <-- 复杂类型:DEFAULT:to;FORK_TRANSITION:start;FORK:name,path(sequence) ,<xs:complexType name="DEFAULT">
        <xs:attribute name="to" type="workflow:IDENTIFIER" use="required"/>
    </xs:complexType>
    <xs:complexType name="FORK_TRANSITION">
        <xs:attribute name="start" type="workflow:IDENTIFIER" use="required"/>
    </xs:complexType>
    <xs:complexType name="FORK">
        <xs:sequence>
            <xs:element name="path" type="workflow:FORK_TRANSITION" minOccurs="2" maxOccurs="unbounded"/>
        </xs:sequence>
        <xs:attribute name="name" type="workflow:IDENTIFIER" use="required"/>
    </xs:complexType>
    <-- 复杂类型:JOIN:name, to; KILL:message,name; ACTION_TRANSITION:to-->
    <xs:complexType name="DEFAULT">
    <xs:complexType name="JOIN">
        <xs:attribute name="name" type="workflow:IDENTIFIER" use="required"/>
        <xs:attribute name="to" type="workflow:IDENTIFIER" use="required"/>
    </xs:complexType>
    <xs:element name="kill" type="workflow:KILL"/>
    <xs:complexType name="KILL">
        <xs:sequence>
            <xs:element name="message" type="xs:string" minOccurs="1" maxOccurs="1"/>
        </xs:sequence>
        <xs:attribute name="name" type="workflow:IDENTIFIER" use="required"/>
    </xs:complexType>
    <xs:complexType name="ACTION_TRANSITION">
        <xs:attribute name="to" type="workflow:IDENTIFIER" use="required"/>
    </xs:complexType>

    <-- 元素:map-reduce;pig  ;sub-workflow;fs;java -->
    <xs:element name="map-reduce" type="workflow:MAP-REDUCE"/>
    <xs:element name="pig" type="workflow:PIG"/>
    <xs:element name="sub-workflow" type="workflow:SUB-WORKFLOW"/>
    <xs:element name="fs" type="workflow:FS"/>
    <xs:element name="java" type="workflow:JAVA"/>
    <xs:complexType name="ACTION">
        <xs:sequence>
            <xs:choice minOccurs="1" maxOccurs="1">
                <xs:element name="map-reduce" type="workflow:MAP-REDUCE" minOccurs="1" maxOccurs="1"/>
                <xs:element name="pig" type="workflow:PIG" minOccurs="1" maxOccurs="1"/>
                <xs:element name="sub-workflow" type="workflow:SUB-WORKFLOW" minOccurs="1" maxOccurs="1"/>
                <xs:element name="fs" type="workflow:FS" minOccurs="1" maxOccurs="1"/>
                <xs:element name="java" type="workflow:JAVA" minOccurs="1" maxOccurs="1"/>
                <xs:any namespace="##other" minOccurs="1" maxOccurs="1"/>
            </xs:choice>
            <xs:element name="ok" type="workflow:ACTION_TRANSITION" minOccurs="1" maxOccurs="1"/>
            <xs:element name="error" type="workflow:ACTION_TRANSITION" minOccurs="1" maxOccurs="1"/>
            <xs:any namespace="uri:oozie:sla:0.1 uri:oozie:sla:0.2" minOccurs="0" maxOccurs="1"/>
        </xs:sequence>
        <xs:attribute name="name" type="workflow:IDENTIFIER" use="required"/>
        <xs:attribute name="cred" type="xs:string"/>
        <xs:attribute name="retry-max" type="xs:string"/>
        <xs:attribute name="retry-interval" type="xs:string"/>
    </xs:complexType>

    <-- MAP-REDUCE定义:job-tracker;name-node ;prepare;streaming;pipes;job-xml;configuration;file -->
    <xs:complexType name="MAP-REDUCE">
        <xs:sequence>
            <xs:element name="job-tracker" type="xs:string" minOccurs="0" maxOccurs="1"/>
            <xs:element name="name-node" type="xs:string" minOccurs="0" maxOccurs="1"/>
            <xs:element name="prepare" type="workflow:PREPARE" minOccurs="0" maxOccurs="1"/>
            <xs:choice minOccurs="0" maxOccurs="1">
                <xs:element name="streaming" type="workflow:STREAMING" minOccurs="0" maxOccurs="1"/>
                <xs:element name="pipes" type="workflow:PIPES" minOccurs="0" maxOccurs="1"/>
            </xs:choice>
            <xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
            <xs:element name="configuration" type="workflow:CONFIGURATION" minOccurs="0" maxOccurs="1"/>
            <xs:element name="file" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
            <xs:element name="archive" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
        </xs:sequence>
    </xs:complexType>

    <-- PIG定义:job-tracker;name-node ;prepare;job-xml;script;param;argument;file;archive -->
    <xs:complexType name="PIG">
        <xs:sequence>
            <xs:element name="job-tracker" type="xs:string" minOccurs="0" maxOccurs="1"/>
            <xs:element name="name-node" type="xs:string" minOccurs="0" maxOccurs="1"/>
            <xs:element name="prepare" type="workflow:PREPARE" minOccurs="0" maxOccurs="1"/>
            <xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
            <xs:element name="configuration" type="workflow:CONFIGURATION" minOccurs="0" maxOccurs="1"/>
            <xs:element name="script" type="xs:string" minOccurs="1" maxOccurs="1"/>
            <xs:element name="param" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
            <xs:element name="argument" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
            <xs:element name="file" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
            <xs:element name="archive" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
        </xs:sequence>
    </xs:complexType>

    <-- SUB-WORKFLOW 定义:app-path;propagate-configuration ;configuration -->
    <xs:complexType name="SUB-WORKFLOW">
        <xs:sequence>
            <xs:element name="app-path" type="xs:string" minOccurs="1" maxOccurs="1"/>
            <xs:element name="propagate-configuration" type="workflow:FLAG" minOccurs="0" maxOccurs="1"/>
            <xs:element name="configuration" type="workflow:CONFIGURATION" minOccurs="0" maxOccurs="1"/>
        </xs:sequence>
    </xs:complexType>

    <-- FS 定义:name-node;job-xml ;configuration; chgrp,touchz,chmod,move,delete,mkdir-->
    <xs:complexType name="FS">
        <xs:sequence>
            <xs:element name="name-node" type="xs:string" minOccurs="0" maxOccurs="1"/>
            <xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
            <xs:element name="configuration" type="workflow:CONFIGURATION" minOccurs="0" maxOccurs="1"/>
            <xs:choice minOccurs="0" maxOccurs="unbounded">
                <xs:element name="delete" type="workflow:DELETE"/>
                <xs:element name="mkdir" type="workflow:MKDIR"/>
                <xs:element name="move" type="workflow:MOVE"/>
                <xs:element name="chmod" type="workflow:CHMOD"/>
                <xs:element name="touchz" type="workflow:TOUCHZ"/>
                <xs:element name="chgrp" type="workflow:CHGRP"/>
            </xs:choice>
        </xs:sequence>
    </xs:complexType>
    <-- JAVA 定义:name-node;job-tracker; prepare;job-xml;configuration;main-class;java-opts;java-opt;arg;file;archive;capture-output-->
    <xs:complexType name="JAVA">
        <xs:sequence>
            <xs:element name="job-tracker" type="xs:string" minOccurs="0" maxOccurs="1"/>
            <xs:element name="name-node" type="xs:string" minOccurs="0" maxOccurs="1"/>
            <xs:element name="prepare" type="workflow:PREPARE" minOccurs="0" maxOccurs="1"/>
            <xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
            <xs:element name="configuration" type="workflow:CONFIGURATION" minOccurs="0" maxOccurs="1"/>
            <xs:element name="main-class" type="xs:string" minOccurs="1" maxOccurs="1"/>
            <xs:choice minOccurs="0" maxOccurs="1">
                <xs:element name="java-opts" type="xs:string" minOccurs="1" maxOccurs="1"/>
                <xs:element name="java-opt" type="xs:string" minOccurs="1" maxOccurs="unbounded"/>
            </xs:choice>
            <xs:element name="arg" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
            <xs:element name="file" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
            <xs:element name="archive" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
            <xs:element name="capture-output" type="workflow:FLAG" minOccurs="0" maxOccurs="1"/>
        </xs:sequence>
    </xs:complexType>
    <xs:complexType name="FLAG"/>
    <xs:complexType name="CONFIGURATION">
        <xs:sequence>
            <xs:element name="property" minOccurs="1" maxOccurs="unbounded">
                <xs:complexType>
                    <xs:sequence>
                        <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/>
                        <xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/>
                        <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/>
                    </xs:sequence>
                </xs:complexType>
            </xs:element>
        </xs:sequence>
    </xs:complexType>
    <-- STREAMING 定义: mapper; reducer;record-reader;record-reader-mapping;env -->
    <xs:complexType name="STREAMING">
        <xs:sequence>
            <xs:element name="mapper" type="xs:string" minOccurs="0" maxOccurs="1"/>
            <xs:element name="reducer" type="xs:string" minOccurs="0" maxOccurs="1"/>
            <xs:element name="record-reader" type="xs:string" minOccurs="0" maxOccurs="1"/>
            <xs:element name="record-reader-mapping" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
            <xs:element name="env" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
        </xs:sequence>
    </xs:complexType>
    <-- PIPES 定义: map; reduce;inputformat;partitioner;writer;program -->
    <xs:complexType name="PIPES">
        <xs:sequence>
            <xs:element name="map" type="xs:string" minOccurs="0" maxOccurs="1"/>
            <xs:element name="reduce" type="xs:string" minOccurs="0" maxOccurs="1"/>
            <xs:element name="inputformat" type="xs:string" minOccurs="0" maxOccurs="1"/>
            <xs:element name="partitioner" type="xs:string" minOccurs="0" maxOccurs="1"/>
            <xs:element name="writer" type="xs:string" minOccurs="0" maxOccurs="1"/>
            <xs:element name="program" type="xs:string" minOccurs="0" maxOccurs="1"/>
        </xs:sequence>
    </xs:complexType>
    <-- PREPARE 定义: delete; mkdir -->
    <xs:complexType name="PREPARE">
        <xs:sequence>
            <xs:element name="delete" type="workflow:DELETE" minOccurs="0" maxOccurs="unbounded"/>
            <xs:element name="mkdir" type="workflow:MKDIR" minOccurs="0" maxOccurs="unbounded"/>
        </xs:sequence>
    </xs:complexType>
    <xs:complexType name="DELETE">
        <xs:attribute name="path" type="xs:string" use="required"/>
    </xs:complexType>
    <xs:complexType name="MKDIR">
        <xs:attribute name="path" type="xs:string" use="required"/>
    </xs:complexType>
    <xs:complexType name="MOVE">
        <xs:attribute name="source" type="xs:string" use="required"/>
        <xs:attribute name="target" type="xs:string" use="required"/>
    </xs:complexType>
    <xs:complexType name="CHMOD">
        <xs:sequence>
            <xs:element name="recursive" type="workflow:FLAG" minOccurs="0" maxOccurs="1"></xs:element>
        </xs:sequence>
        <xs:attribute name="path" type="xs:string" use="required"/>
        <xs:attribute name="permissions" type="xs:string" use="required"/>
        <xs:attribute name="dir-files" type="xs:string"/>
    </xs:complexType>
    <xs:complexType name="TOUCHZ">
        <xs:attribute name="path" type="xs:string" use="required"/>
    </xs:complexType>
    <xs:complexType name="CHGRP">
        <xs:sequence>
            <xs:element name="recursive" type="workflow:FLAG" minOccurs="0" maxOccurs="1"></xs:element>
        </xs:sequence>
        <xs:attribute name="path" type="xs:string" use="required"/>
        <xs:attribute name="group" type="xs:string" use="required"/>
        <xs:attribute name="dir-files" type="xs:string"/>
    </xs:complexType>
    <xs:complexType name="CREDENTIALS">
        <xs:sequence minOccurs="0" maxOccurs="unbounded">
            <xs:element name="credential" type="workflow:CREDENTIAL"/>
        </xs:sequence>
    </xs:complexType>
    <xs:complexType name="CREDENTIAL">
        <xs:sequence  minOccurs="0" maxOccurs="unbounded" >
                 <xs:element name="property" minOccurs="1" maxOccurs="unbounded">
                    <xs:complexType>
                       <xs:sequence>
                            <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/>
                            <xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/>
                            <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/>
                       </xs:sequence>
                    </xs:complexType>
                 </xs:element>
        </xs:sequence>
        <xs:attribute name="name" type="xs:string" use="required"/>
        <xs:attribute name="type" type="xs:string" use="required"/>
    </xs:complexType>
</xs:schema>

附录2:WF样例

    <workflow-app name='example-forkjoinwf' xmlns="uri:oozie:workflow:0.1">
        <start to='firstjob' />
        <action name="firstjob">
            <map-reduce>
                <job-tracker>${jobtracker}</job-tracker>
                <name-node>${namenode}</name-node>
                <configuration>
                    <property>
                        <name>mapred.mapper.class</name>
                        <value>org.apache.hadoop.example.IdMapper</value>
                    </property>
                    <property>
                        <name>mapred.reducer.class</name>
                        <value>org.apache.hadoop.example.IdReducer</value>
                    </property>
                    <property>
                        <name>mapred.map.tasks</name>
                        <value>1</value>
                    </property>
                    <property>
                        <name>mapred.input.dir</name>
                        <value>${input}</value>
                    </property>
                    <property>
                        <name>mapred.output.dir</name>
                        <value>/usr/foo/${wf:id()}/temp1</value>
                    </property>
                </configuration>
            </map-reduce>
            <ok to="fork" />
            <error to="kill" />
        </action>
        <fork name='fork'>
            <path start='secondjob' />
            <path start='thirdjob' />
        </fork>
        <action name="secondjob">
            <map-reduce>
                <job-tracker>${jobtracker}</job-tracker>
                <name-node>${namenode}</name-node>
                <configuration>
                    <property>
                        <name>mapred.mapper.class</name>
                        <value>org.apache.hadoop.example.IdMapper</value>
                    </property>
                    <property>
                        <name>mapred.reducer.class</name>
                        <value>org.apache.hadoop.example.IdReducer</value>
                    </property>
                    <property>
                        <name>mapred.map.tasks</name>
                        <value>1</value>
                    </property>
                    <property>
                        <name>mapred.input.dir</name>
                        <value>/usr/foo/${wf:id()}/temp1</value>
                    </property>
                    <property>
                        <name>mapred.output.dir</name>
                        <value>/usr/foo/${wf:id()}/temp2</value>
                    </property>
                </configuration>
            </map-reduce>
            <ok to="join" />
            <error to="kill" />
        </action>
        <action name="thirdjob">
            <map-reduce>
                <job-tracker>${jobtracker}</job-tracker>
                <name-node>${namenode}</name-node>
                <configuration>
                    <property>
                        <name>mapred.mapper.class</name>
                        <value>org.apache.hadoop.example.IdMapper</value>
                    </property>
                    <property>
                        <name>mapred.reducer.class</name>
                        <value>org.apache.hadoop.example.IdReducer</value>
                    </property>
                    <property>
                        <name>mapred.map.tasks</name>
                        <value>1</value>
                    </property>
                    <property>
                        <name>mapred.input.dir</name>
                        <value>/usr/foo/${wf:id()}/temp1</value>
                    </property>
                    <property>
                        <name>mapred.output.dir</name>
                        <value>/usr/foo/${wf:id()}/temp3</value>
                    </property>
                </configuration>
            </map-reduce>
            <ok to="join" />
            <error to="kill" />
        </action>
        <join name='join' to='finalejob'/>
        <action name="finaljob">
            <map-reduce>
                <job-tracker>${jobtracker}</job-tracker>
                <name-node>${namenode}</name-node>
                <configuration>
                    <property>
                        <name>mapred.mapper.class</name>
                        <value>org.apache.hadoop.example.IdMapper</value>
                    </property>
                    <property>
                        <name>mapred.reducer.class</name>
                        <value>org.apache.hadoop.example.IdReducer</value>
                    </property>
                    <property>
                        <name>mapred.map.tasks</name>
                        <value>1</value>
                    </property>
                    <property>
                        <name>mapred.input.dir</name>
                        <value>/usr/foo/${wf:id()}/temp2,/usr/foo/${wf:id()}/temp3
                        </value>
                    </property>
                    <property>
                        <name>mapred.output.dir</name>
                        <value>${output}</value>
                    </property>
                </configuration>
            </map-reduce>
            <ok to="end" />
            <ok to="kill" />
        </action>
        <kill name="kill">
            <message>Map/Reduce failed, error message[${wf:errorMessage()}]</message>
        </kill>
        <end name='end'/>
    </workflow-app>